26 实践课-多工具组合调用逻辑设计
多工具组合调用逻辑设计
关联:索引
术语小抄(初学者版)
-
串行(Serial):工具按固定顺序一个接一个执行,上一个成功才执行下一个。
-
并行(Parallel):多个工具同时执行,等待全部(或关键部分)完成后再合并结果。
-
编排(Orchestration):规定“什么时候调用哪些工具、并发多少、失败怎么办、怎么合并”的流程控制层。
-
Join(汇合):并行分支执行完,把多份结果合并回同一个状态的步骤。
-
规范化返回(Normalized Result):把不同工具的输出统一成同一种结构,便于测试、审计、回归与复盘。
-
标注数据(Labeled Data):人工给出的“输入—期望输出/期望调用序列”的样本,用于验证系统的准确性与一致性。
-
先修:至少完成 13 的工具契约;建议做过 15/16(设备控制)或 18(标注数据/测试)。
作业:布置(见文末)
- 多工具系统里,“工具是否能跑”只是起点,真正影响交付的是“多个工具能否稳定协同”:顺序正确、失败可控、结果可复验、控制可审计。
- 工业分拣链路里,设备控制工具必须放在最后;只要上游结果不可信或不完整,就应该拒绝控制并给出可解释原因。
- 为什么“视觉识别失败”时不能直接尝试机械臂控制?
- 如果并行调用了两个工具,一个成功一个失败,你的系统应当返回成功还是失败?为什么?
1)串行:适用于“强依赖链”
分拣主链路属于强依赖:
-
视觉识别输出(物体类别/抓取点位)是解析指令与控制动作的前置条件。
-
指令解析输出(目标格口/动作类型/安全限制)是控制工具入参的前置条件。
-
上一步失败:立即停止后续工具,返回结构化错误与 trace_id。
-
上一步成功但“不可信”:也要停止(例如置信度低/缺关键字段),避免“带病下发”。
2)并行:适用于“弱依赖 + 可合并”
并行适用的典型情况:
-
一个主链路(视觉→解析→控制)之外,还要同时做“安全检查/权限校验/知识检索/日志落盘”,它们彼此独立,最后合并即可。
-
先定义 Join 规则:哪些结果是“必须成功”,哪些是“可选但要记录”。
-
失败传播:关键分支失败则整体失败;非关键分支失败则降级并记录到
warnings。 -
统一格式:所有分支都返回规范化结构,Join 才能写得简单、可测。
本讲全程统一的“规范化返回格式”(建议你们组后续所有工具/编排都对齐):
{
"ok": true,
"trace_id": "a1b2c3d4",
"tool": "vision_detect",
"ts_ms": 1710000000000,
"data": {},
"error": null
}
解释与自检要点:
-
ok:只表示本次调用是否成功,不要混入“业务是否完成”的含义。 -
trace_id:一次编排中的所有工具调用建议共享同一个 trace_id(或在子调用中派生),便于串联证据链。 -
tool:明确来自哪个工具/哪一步(并行 Join 时尤其重要)。 -
ts_ms:毫秒时间戳,用于排序与审计。 -
data:成功结果放这里,字段必须稳定、可回归断言。 -
error:失败时必须是结构化对象(建议含code/message/detail),不要只返回一句话。 -
合并只做两件事:写入“状态(state)”与生成“下一步入参(payload)”;不要把业务逻辑散落在各处。
-
每一步都把关键字段“规范化到同一口径”,例如
target_bin、item_type、action、safety。 -
控制类工具(机械臂)执行前必须二次校验:权限/场景/参数范围/置信度阈值。
把下面代码保存为 orchestrator_demo.py,然后运行。示例工具用的是“可运行的桩函数”,目的是把编排结构与返回规范先跑通,后续再替换成你们真实工具(视觉模型、解析器、ROS2 控制)。
from __future__ import annotations
# 这是一个“多工具编排层”的最小可运行示例:
# - 用 3 个核心工具模拟分拣主链路:视觉识别 → 指令解析 → 机械臂控制(串行)
# - 在控制工具前加入门禁:safety_check(置信度阈值)
# - 所有工具都返回统一的结构(规范化返回),便于 Join、测试、审计、回归
# 你后续替换成真实工具(视觉模型/解析器/ROS2 控制)时,尽量只替换工具函数体,保留返回结构与编排骨架。
import asyncio
import json
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
def now_ms() -> int:
# 统一生成毫秒时间戳:用于日志排序、审计对齐、问题回放定位
return int(time.time() * 1000)
def ok_result(*, tool: str, trace_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
# 规范化成功返回:
# - ok=true 表示“这一步工具调用成功”,不等价于“业务完成”
# - tool/trace_id/ts_ms 是审计与串联证据链的关键字段
return {"ok": True, "tool": tool, "trace_id": trace_id, "ts_ms": now_ms(), "data": data, "error": None}
def err_result(*, tool: str, trace_id: str, code: str, message: str, detail: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
# 规范化失败返回:
# - code:机器可判断的错误码(用于统计/报警/回归断言)
# - message:给人看的简短原因
# - detail:补充信息(缺哪些字段、阈值是多少等),便于复现与修复
return {
"ok": False,
"tool": tool,
"trace_id": trace_id,
"ts_ms": now_ms(),
"data": {},
"error": {"code": code, "message": message, "detail": detail or {}},
}
def require_fields(obj: Dict[str, Any], fields: List[str]) -> Tuple[bool, List[str]]:
# 统一的“必填字段检查”:把业务字段缺失变成可解释、可复现的错误
missing = [f for f in fields if obj.get(f) in (None, "", [])]
return (len(missing) == 0), missing
async def tool_vision_detect(*, image_path: str, trace_id: str) -> Dict[str, Any]:
# 工具 1:视觉识别(桩)
# 真实场景中这里会做:图像读取/预处理/模型推理/结果后处理
tool = "vision_detect"
p = (image_path or "").strip()
if not p:
# 输入不合规:直接拒绝,避免后续工具“带病执行”
return err_result(tool=tool, trace_id=trace_id, code="INPUT_EMPTY", message="image_path is empty")
# 用 sleep 模拟“工具耗时”;真实项目里可以替换为实际推理时间
await asyncio.sleep(0.05)
# 这里用非常简化的规则模拟识别结果(只用于课堂演示编排结构)
item_type = "apple" if "apple" in p.lower() else "unknown"
confidence = 0.92 if item_type == "apple" else 0.30
return ok_result(tool=tool, trace_id=trace_id, data={"item_type": item_type, "confidence": confidence})
async def tool_parse_instruction(*, instruction: str, trace_id: str) -> Dict[str, Any]:
# 工具 2:指令解析(桩)
# 真实场景中这里会做:意图识别/槽位抽取/约束提取/参数标准化
tool = "parse_instruction"
ins = (instruction or "").strip()
if not ins:
return err_result(tool=tool, trace_id=trace_id, code="INPUT_EMPTY", message="instruction is empty")
await asyncio.sleep(0.03)
lower = ins.lower()
# 解析“目标格口”:
# - 只匹配明确模式,避免出现 "place" 这种英文词导致误判(含字母 a/b)
if ("a格口" in lower) or ("bin_a" in lower) or ("slot_a" in lower):
target_bin = "bin_a"
elif ("b格口" in lower) or ("bin_b" in lower) or ("slot_b" in lower):
target_bin = "bin_b"
else:
target_bin = "bin_unknown"
# 解析“动作类型”:
# - 工业控制类动作必须有限枚举,unknown 一律不允许控制执行
action = "place" if ("place" in lower or "放" in ins) else "unknown"
return ok_result(tool=tool, trace_id=trace_id, data={"action": action, "target_bin": target_bin})
async def tool_arm_control(*, action: str, target_bin: str, item_type: str, trace_id: str) -> Dict[str, Any]:
# 工具 3:机械臂控制(桩)
# 真实场景中这里会对接 ROS2/PLC/设备 SDK;课堂里用桩函数强调两件事:
# 1)控制类工具必须最后执行
# 2)执行前必须做门禁与参数二次校验(宁可拒绝也不误控)
tool = "arm_control"
ok1, missing = require_fields({"action": action, "target_bin": target_bin, "item_type": item_type}, ["action", "target_bin", "item_type"])
if not ok1:
return err_result(tool=tool, trace_id=trace_id, code="INPUT_MISSING", message="missing required fields", detail={"missing": missing})
await asyncio.sleep(0.04)
# 安全拒绝策略:只要关键参数不确定,就不下发控制
if target_bin == "bin_unknown" or action == "unknown" or item_type == "unknown":
return err_result(tool=tool, trace_id=trace_id, code="SAFETY_REJECT", message="unsafe to execute control with unknown parameters")
# 模拟“控制执行回执”:工业场景里这个回执必须可追溯、可审计
return ok_result(tool=tool, trace_id=trace_id, data={"ack": True, "executed": {"action": action, "target_bin": target_bin, "item_type": item_type}})
async def tool_safety_check(*, confidence: float, trace_id: str) -> Dict[str, Any]:
# 门禁工具(示例):用置信度做阈值门禁
# 真实场景可扩展:权限校验、二次确认、速度上限、危险动作白名单等
tool = "safety_check"
await asyncio.sleep(0.01)
allow_control = float(confidence) >= 0.85
return ok_result(tool=tool, trace_id=trace_id, data={"allow_control": allow_control, "min_confidence": 0.85})
@dataclass
class PipelineOutput:
# 编排层输出结构:
# - steps:保留每一步工具的“规范化返回”,形成证据链
# - final:对外给业务的关键结果(成功时给关键字段,失败时给 reason)
ok: bool
trace_id: str
steps: List[Dict[str, Any]]
final: Dict[str, Any]
warnings: List[Dict[str, Any]]
def to_dict(self) -> Dict[str, Any]:
return {
"ok": self.ok,
"trace_id": self.trace_id,
"steps": self.steps,
"final": self.final,
"warnings": self.warnings,
}
async def orchestrate_sorting(*, image_path: str, instruction: str) -> PipelineOutput:
# 编排主函数(串行主链路):
# - 严格按顺序执行:vision → parse → safety → control
# - 任一步失败:立刻停止并返回(fail-fast),避免“错误扩散”
trace_id = uuid.uuid4().hex[:8]
steps: List[Dict[str, Any]] = []
warnings: List[Dict[str, Any]] = []
# Step 1:视觉识别
vision = await tool_vision_detect(image_path=image_path, trace_id=trace_id)
steps.append(vision)
if not vision["ok"]:
return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "vision_failed"}, warnings=warnings)
# Step 2:指令解析(依赖视觉/业务上下文时,也可以把 vision 结果作为输入的一部分传入)
parse = await tool_parse_instruction(instruction=instruction, trace_id=trace_id)
steps.append(parse)
if not parse["ok"]:
return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "parse_failed"}, warnings=warnings)
# Step 3:门禁(控制前必经)
# 这里的门禁只用置信度演示;真实系统还应加入权限/场景/参数范围等门禁
safety = await tool_safety_check(confidence=float(vision["data"]["confidence"]), trace_id=trace_id)
steps.append(safety)
if not safety["ok"]:
return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "safety_check_failed"}, warnings=warnings)
if not bool(safety["data"]["allow_control"]):
return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "confidence_too_low"}, warnings=warnings)
# Step 4:机械臂控制(必须最后执行)
# 通过“上游输出 → 下游入参”的方式传参,避免出现字段名不一致与不可追踪的隐式依赖
control = await tool_arm_control(
action=str(parse["data"]["action"]),
target_bin=str(parse["data"]["target_bin"]),
item_type=str(vision["data"]["item_type"]),
trace_id=trace_id,
)
steps.append(control)
if not control["ok"]:
return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "control_failed"}, warnings=warnings)
# 最终对外结果:只保留业务关键字段(其余证据在 steps 里)
final = {
"item_type": vision["data"]["item_type"],
"target_bin": parse["data"]["target_bin"],
"action": parse["data"]["action"],
"executed": control["data"]["executed"],
}
return PipelineOutput(ok=True, trace_id=trace_id, steps=steps, final=final, warnings=warnings)
async def demo_parallel_extension() -> Dict[str, Any]:
# 并行扩展示例(只演示“并行骨架”):
# - vision 与 parse 在这里没有数据依赖,因此可并行
# - 注意:并行情况下 steps 的 append 顺序可能不稳定(调度导致),不要依赖 steps 的顺序做严格断言
trace_id = uuid.uuid4().hex[:8]
steps: List[Dict[str, Any]] = []
async def _vision():
r = await tool_vision_detect(image_path="sample_apple.jpg", trace_id=trace_id)
steps.append(r)
return r
async def _parse():
r = await tool_parse_instruction(instruction="把苹果放到A格口", trace_id=trace_id)
steps.append(r)
return r
# gather:并行等待两个分支结束;真实项目里可以加入超时、取消策略、关键分支失败就中止等逻辑
v, p = await asyncio.gather(_vision(), _parse())
return {"trace_id": trace_id, "ok": bool(v["ok"] and p["ok"]), "joined": {"vision": v, "parse": p}, "steps": steps}
def main():
# 运行串行主链路 demo
out = asyncio.run(orchestrate_sorting(image_path="apple_001.jpg", instruction="把苹果放到A格口"))
print(json.dumps(out.to_dict(), ensure_ascii=False, indent=2))
# 运行并行扩展示例 demo
joined = asyncio.run(demo_parallel_extension())
print(json.dumps(joined, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
解释与自检要点:
ok_result/err_result:把所有工具的输出统一成同一结构,后续测试与 Join 才能通用。trace_id:orchestrate_sorting()内部生成一次并贯穿全流程,保证“同一次编排”的证据可串联。tool_vision_detect/tool_parse_instruction/tool_arm_control:三段工具函数是“桩”,你们替换为真实工具时只要保持返回结构一致,编排层基本不用动。tool_safety_check:示范“控制前门禁”,把“置信度阈值”变成可验证规则;工业场景里可扩展为权限校验、二次确认、限速等。demo_parallel_extension():演示并行调用的最小骨架,关键在asyncio.gather();真实项目中要为 Join 设计“关键分支/非关键分支”规则。
运行命令(在含 orchestrator_demo.py 的目录执行):
python orchestrator_demo.py
解释与自检要点:
python orchestrator_demo.py:执行示例编排,输出两段 JSON:串行编排结果与并行 Join 示例结果。- 自检:观察输出中
steps是否按顺序记录;失败时是否有error.code;同一次编排的trace_id是否一致。
目标(分组):
- 用你们已做过的工具(或本讲桩工具)把“分拣主链路”跑通,并输出统一的
steps证据链。
步骤(建议):
- 选定 3 个工具:视觉识别 / 指令解析 / 机械臂控制(或你们自选题的 3 个等价工具)。
- 写出串行流程:上游失败立刻停止,返回结构化错误。
- 加入一个门禁工具:
safety_check(或authorize)放在控制工具之前。
-
能打印一份结构化 JSON,含:
ok/trace_id/steps/final。 -
至少能复现 2 类失败:输入为空、置信度不足(或解析失败),并能解释为什么要拒绝控制。
-
AI 适合:生成流程骨架、边界条件清单、测试用例、重复性样板代码。
-
人必须做:字段口径对齐、风险门禁、错误语义、审计与回归验证、删除多余假设。
审计清单(拿到 AI 代码后先检查这 6 项):
-
工具名与字段名是否与你们注册表/工具实现一致(避免“拼错就全崩”)。
-
是否把控制类工具放最后,并在前面做门禁(权限/置信度/二次确认)。
-
失败是否结构化(
error.code/message/detail),是否可定位(trace_id)。 -
并行分支的 Join 是否有明确规则(哪些必须成功,哪些可选)。
-
是否把不可控的自然语言直接传给控制工具(危险做法)。
-
是否有最小测试(至少:正常 2、异常 2)。
-
调用序列匹配率:标注期望的工具调用顺序 vs 实际
steps[*].tool。 -
关键结果匹配率:标注期望字段(如
item_type/target_bin/action)vs 实际final。
标注数据最小格式(JSONL:一行一个样本),保存为 labels.jsonl:
{"id":"case-001","image_path":"apple_001.jpg","instruction":"把苹果放到A格口","expect":{"tool_sequence":["vision_detect","parse_instruction","safety_check","arm_control"],"final":{"item_type":"apple","target_bin":"bin_a","action":"place"}}}
{"id":"case-002","image_path":"","instruction":"把苹果放到A格口","expect":{"tool_sequence":["vision_detect"],"final":{"reason":"vision_failed"}}}
{"id":"case-003","image_path":"unknown_001.jpg","instruction":"把苹果放到A格口","expect":{"tool_sequence":["vision_detect","parse_instruction","safety_check"],"final":{"reason":"confidence_too_low"}}}
解释与自检要点:
id:样本编号,用于输出问题清单时定位。image_path/instruction:编排输入,字段名必须与编排函数参数一致。expect.tool_sequence:期望的工具调用顺序;失败样本通常只包含“走到哪一步就停”。expect.final:只标注关键字段或关键原因(reason),避免把“每个细节”都写死导致无法演进。
把下面代码保存为 evaluate_pipeline.py,与 orchestrator_demo.py、labels.jsonl 放在同一目录后运行。
from __future__ import annotations
# 这是一个“标注数据驱动验证”的最小脚本:
# - 输入:labels.jsonl(每行一个样本,包含 expect.tool_sequence 与 expect.final)
# - 过程:逐条运行编排 orchestrate_sorting(),对比“实际 steps 与 final”是否满足标注期望
# - 输出:总体匹配率 + 每条样本的问题清单(便于定位与回归)
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Tuple
import asyncio
from orchestrator_demo import orchestrate_sorting
def load_jsonl(path: Path) -> List[Dict[str, Any]]:
# 读取 JSONL:一行一个 JSON 对象,便于你不断追加标注样本
lines = path.read_text(encoding="utf-8").splitlines()
out: List[Dict[str, Any]] = []
for ln in lines:
s = ln.strip()
if s:
out.append(json.loads(s))
return out
def tool_sequence(steps: List[Dict[str, Any]]) -> List[str]:
# 从编排输出的 steps 中提取“实际调用序列”(每一步的 tool 字段)
# 串行链路:该序列通常稳定,可直接与标注做严格相等对比
# 并行链路:该序列可能受调度影响不稳定,建议改为校验“必经子序列/相对顺序约束/关键工具集合”
return [str(s.get("tool", "")) for s in steps if s.get("tool")]
def match_subset(expect: Dict[str, Any], actual: Dict[str, Any]) -> Tuple[bool, List[str]]:
# 子集匹配:只校验标注里写明的关键字段
# 好处:系统可演进(final 增加新字段不影响旧标注),也避免把“非关键细节”写死
mismatches: List[str] = []
for k, v in expect.items():
if actual.get(k) != v:
mismatches.append(f"{k}: expect={v!r}, actual={actual.get(k)!r}")
return (len(mismatches) == 0), mismatches
@dataclass
class EvalRow:
# 每条样本的评估结果(用于输出问题清单与课堂验收)
sample_id: str
ok: bool
seq_ok: bool
final_ok: bool
issues: List[str]
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.sample_id,
"ok": self.ok,
"seq_ok": self.seq_ok,
"final_ok": self.final_ok,
"issues": self.issues,
}
async def eval_one(sample: Dict[str, Any]) -> EvalRow:
# 评估单条样本:
# - 跑一次编排得到实际输出
# - 对比标注期望的 tool_sequence 与 final(关键字段子集)
# - 汇总成 issues(问题清单),用于定位错误原因
sample_id = str(sample.get("id", ""))
image_path = str(sample.get("image_path", ""))
instruction = str(sample.get("instruction", ""))
expect = sample.get("expect") or {}
# 执行编排:这里把 sample 的输入字段与 orchestrate_sorting() 的参数一一对齐
out = await orchestrate_sorting(image_path=image_path, instruction=instruction)
out_dict = out.to_dict()
# 1)调用序列匹配:用于检查“顺序控制逻辑是否正确”
actual_seq = tool_sequence(out_dict["steps"])
expect_seq = list(expect.get("tool_sequence") or [])
seq_ok = actual_seq == expect_seq
# 2)关键结果匹配:用于检查“整合后的关键字段是否正确”
expect_final = dict(expect.get("final") or {})
actual_final = dict(out_dict.get("final") or {})
final_ok, mismatches = match_subset(expect_final, actual_final)
# 汇总问题清单:既包含序列不一致,也包含关键字段不一致
issues: List[str] = []
if not seq_ok:
issues.append(f"tool_sequence mismatch: expect={expect_seq}, actual={actual_seq}")
issues.extend(mismatches)
ok = bool(seq_ok and final_ok)
return EvalRow(sample_id=sample_id, ok=ok, seq_ok=seq_ok, final_ok=final_ok, issues=issues)
async def main():
# 主入口:逐条样本评估 → 统计匹配率 → 打印 JSON 报告
samples = load_jsonl(Path("labels.jsonl"))
rows: List[EvalRow] = []
for s in samples:
rows.append(await eval_one(s))
# 统计三类指标:
# - overall_ok_rate:序列与结果都匹配的比例(最严格)
# - sequence_match_rate:只看流程/顺序是否对
# - final_match_rate:只看关键结果是否对
total = len(rows)
ok_cnt = sum(1 for r in rows if r.ok)
seq_cnt = sum(1 for r in rows if r.seq_ok)
final_cnt = sum(1 for r in rows if r.final_ok)
report = {
"total": total,
"overall_ok_rate": 0.0 if total == 0 else ok_cnt / total,
"sequence_match_rate": 0.0 if total == 0 else seq_cnt / total,
"final_match_rate": 0.0 if total == 0 else final_cnt / total,
"rows": [r.to_dict() for r in rows],
}
print(json.dumps(report, ensure_ascii=False, indent=2))
if __name__ == "__main__":
asyncio.run(main())
解释与自检要点:
labels.jsonl:一行一个样本,便于增量追加;读取时跳过空行。tool_sequence():从steps抽取实际工具序列,直接对比标注的expect.tool_sequence。- 注意:当你引入并行分支时,
steps的记录顺序可能受调度影响而不稳定;这时不建议做“全序列严格相等”,更建议验证“必经子序列”(例如主链路顺序)+ “关键工具集合”或“相对顺序约束”。 match_subset():只验证标注里出现的关键字段,避免把整个final写死。EvalRow.issues:输出问题清单的核心字段,要求能定位“哪一项不匹配”。- 指标解释:
sequence_match_rate更关注“流程对不对”,final_match_rate更关注“结果对不对”,两者都重要。
运行命令(在含 3 个文件的目录执行):
python evaluate_pipeline.py
解释与自检要点:
python evaluate_pipeline.py:批量运行标注样本,输出 JSON 报告。- 自检:至少出现
overall_ok_rate/sequence_match_rate/final_match_rate三个数;不匹配时rows[*].issues要可读、可定位。
七、课程思政(融入点)
- 多工具组合逻辑设计是工业智能系统从“单点智能”走向“系统智能”的关键:通过合理编排减少无效调用与误动作,提高产线节拍与稳定性。
- 规范化返回与标注验证体现工程伦理:用证据链与可验证指标保障安全与质量,培养服务产业、面向交付的责任意识。
- 系统思维训练:把复杂问题拆成可控模块(工具),再用编排与验证把模块重新组织成可交付系统。
课后作业(布置)
1)提交自选题场景的工具组合调用逻辑图(含调用顺序、结果整合规则)。
2)提交工具调用代码(AI 生成 + 人工优化版)、标注数据验证结果(含问题清单)。
3)撰写 200 字左右说明:阐述工具调用逻辑设计的思考与优化方向(建议包含:门禁策略、Join 规则、验证指标、下一步改进)。
输出前自检清单(本已执行)
- 标题层级连续(从
#到##/###/####无跳级)。 - 所有代码块与命令块均闭合,且带语言标签。
- 每段代码/命令后均给出解释与自检要点。
- 关键术语口径与既有一致:trace_id、结构化错误、门禁优先、可追溯与可验证。